package com.jl.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @description: 消费者: 过滤器
 * @author: jl
 * @create: 2021/4/5 12:07
 */
public class FilterConsumer {

    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jlConsumerGroup");
        consumer.setNamesrvAddr(RocketmqConstant.mq_namesrv_cluster);

        //订阅消息
        //每个consumer关注一个topic.  过滤器 * 表示不过滤
        consumer.subscribe(RocketmqConstant.topic2, "tag-1");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    byte[] body = msg.getBody();
                    String content = new String(body);
                    String bornHostString = msg.getBornHostString();

                    System.out.println("topic:" + topic +", content=" + content + ", host="+bornHostString);
                }
                //默认情况下这条消息只会被一个consumer消费到  点对点
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("consumer start------------");


    }
}
